package net.gdface.facelog;

import gu.simplemq.Channel;
import gu.simplemq.IMQConnParameterSupplier;
import gu.simplemq.IMessageQueueFactory;
import gu.simplemq.MessageQueueFactorys;
import gu.simplemq.MessageQueueType;
import gu.simplemq.exceptions.SmqUnsubscribeException;
import gu.simplemq.json.BaseJsonEncoder;
import gu.simplemq.redis.JedisPoolLazy;
import gu.simplemq.redis.RedisFactory;
import gu.simplemq.redis.RedisTable;
import gu.sql2java.exception.ObjectRetrievalException;
import net.gdface.facelog.mq.DeviceHeartbeatListener;
import net.gdface.facelog.mq.DeviceHeartdbeatPackage;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Maps;
import com.google.common.net.HostAndPort;
import static com.google.common.base.Preconditions.checkNotNull;

/**
 * 消息系统管理
 * @author guyadong
 *
 */
class MessageQueueManagement implements ServiceConstant,ChannelConstant,IMQConnParameterSupplier{
    private static final Logger logger = LoggerFactory.getLogger(MessageQueueManagement.class);

    public static final String MQ_CONN_HOST = "host";
    public static final String MQ_CONN_PORT = "port";
    public static final String MQ_CONN_URI = "uri";
	private final DaoManagement dm;
	private final RedisManagement rm;
	/** 设备心跳表(REDIS) */
	private final RedisTable<DeviceHeartdbeatPackage> table;
	/** 设备心跳消息频道 */
	private final Channel<DeviceHeartdbeatPackage> monitorChannel;
	/** 配置文件中定义的消息系统参数 */
	private final Map<MQParam, String> messageQueueConfig;
	private IMessageQueueFactory factory;
	/** 消息服务控制器(延迟初始化) */
	private volatile BaseMQController mqController;
	MessageQueueManagement(RedisManagement rm,DaoManagement dm) {
		this.dm = dm;
		this.rm = rm;
		this.messageQueueConfig = GlobalConfig.makeMessageQueueMqConfig();
		this.factory = MessageQueueFactorys.getFactory(GlobalConfig.getMessageQueueType());
		initMessageQueueControllerIfNeeded();
		/** RedisManagement 类已经创建了 JedisPoolLazy 默认实例，这里直接获取就可以了 */
		this.table =  RedisFactory.getTable(TABLE_HEARTBEAT, JedisPoolLazy.getDefaultInstance());
		this.table.setExpire(DEFAULT_HEARTBEAT_EXPIRE, TimeUnit.SECONDS);
		String name = checkNotNull(getMessageQueueParameters().get(MQParam.HB_MONITOR_CHANNEL),
				"%s NODE DEFINED",MQParam.HB_MONITOR_CHANNEL);
		this.monitorChannel = new Channel<>( name,DeviceHeartdbeatPackage.class,new Listener());
		this.factory.asDefaultFactory().getSubscriber().register(monitorChannel);
	}

	/**
	 * 返回消息系统参数配置
	 * @return 消息系统参数名-值映射
	 */
	Map<MQParam,String> getMessageQueueParameters(){
		HashMap<MQParam, String>map = Maps.newHashMap();
		map.putAll(messageQueueConfig);
		map.putAll(rm.getRedisParameters());
		return map;
    }
	@Override
	public MessageQueueType getImplType() {
		return factory.getImplType();
	}
	@Override
	public HostAndPort getHostAndPort() {
		return factory.getHostAndPort();
	}

	@Override
	public Map<String, Object> getMQConnParameters() {
		return factory.getMQConnParameters();
	}
	/**
	 * 设置设备心跳表中数据过期时间
	 * @param duration
	 * @param unit 时间单位
	 * @return 当前对象
	 */
	public MessageQueueManagement setHBTableExpire(long duration, TimeUnit unit){
		if(duration > 0){
			this.table.setExpire(duration, checkNotNull(unit,"unit is null"));
		}
		return this;
	}
	
	/**
	 * 根据需要初始化消息服务控制器(只执行一次)
	 */
	private void initMessageQueueControllerIfNeeded(){
		// double check
		if(mqController == null){
			synchronized (this) {
				if(mqController == null){
					switch(factory.getImplType()){
					case REDIS:
						/* implType为REDIS时,使用REDIS连接参数初始化,忽略配置文件中'mq.connect.'定义的连接参数 */
						if(!factory.initialized()){
							factory.init(rm.getMQConnParameters());
						}
						break;
					case ACTIVEMQ:
						mqController = ActivemqController.makeActivemqController();
						break;
					default:
						break;
					}
					if(mqController != null){
						boolean needUpdate = mqController.init();
						if(needUpdate){
							this.factory = mqController.factory;
						}
						// 更新连接参数
						String json = BaseJsonEncoder.getEncoder()
								.toJsonString(this.factory.getMQConnParameters());
						messageQueueConfig.put(MQParam.MQ_CONNECT, json);
					}
				}
			}
		}
	}
	
	public IMessageQueueFactory getFactory() {
		return factory;
	}

	private class Listener implements DeviceHeartbeatListener{

		@Override
		public void onSubscribe(DeviceHeartdbeatPackage t) throws SmqUnsubscribeException {
			try {
				/* 心跳数据写入redis table */ 
				String hardwareAddress = checkNotNull(dm.daoGetDeviceChecked(t.getDeviceId()).getMac(),"MAC is null from heartbeat package");
				table.set(hardwareAddress,t, false);
				table.expire(hardwareAddress);
			} catch (ObjectRetrievalException e) {
				logger.error("NOT FOUND DEVICE DATA FOR ID={}", t.getDeviceId());
			}catch (Exception e) {
				logger.error(e.toString());
			}
		}
		
	}
}
